#include "config.h"

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/uio.h>
#include <sys/resource.h>
#include <unistd.h>
#include <unistd.h>
#include <stdint.h>
#include <string.h>
#include <semaphore.h>
#include <poll.h> 
#include <pthread.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "ynet_rpc.h"
#include "cluster.h"
#include "job_dock.h"
#include "sysy_lib.h"
#include "replica.h"
#include "net_global.h"
#include "ynet_rpc.h"
#include "corerpc.h"
#include "core.h"
#include "rpc_proto.h"
#include "dbg.h"

extern net_global_t ng;

typedef enum {
        REPLICA_NULL = 400,
        REPLICA_CREATE,             /* 1 */
        REPLICA_WRITE,
        REPLICA_READ,
        REPLICA_SHA1,
        REPLICA_CONNECT,
        REPLICA_PUSH,
        REPLICA_UNLINK,
        REPLICA_CLEANUP,

        REPLICA_SETPARENT,
        REPLICA_GETPARENT,
        REPLICA_GETCLOCK,
        REPLICA_SETCLOCK,
        REPLICA_ANALYSIS_UPDATE,
        REPLICA_SETTIER,
        REPLICA_GETTIER,
        REPLICA_SETPRIORITY,
        REPLICA_GETPRIORITY,
        REPLICA_ONLINE,
        REPLICA_MAX,
} replica_op_t;

typedef struct {
        uint32_t op;
        uint32_t buflen;
        chkid_t  chkid;
        char buf[0];
} msg_t;

static __request_handler_func__  __request_handler__[REPLICA_MAX - REPLICA_NULL];
static char  __request_name__[REPLICA_MAX - REPLICA_NULL][__RPC_HANDLER_NAME__ ];

static void __request_set_handler(int op, __request_handler_func__ func, const char *name)
{
        YASSERT(strlen(name) + 1 < __RPC_HANDLER_NAME__ );
        strcpy(__request_name__[op - REPLICA_NULL], name);
        __request_handler__[op - REPLICA_NULL] = func;
}

static void __request_get_handler(int op, __request_handler_func__ *func, const char **name)
{
        *func = __request_handler__[op - REPLICA_NULL];
        *name = __request_name__[op - REPLICA_NULL];
}

static void __getmsg(buffer_t *buf, msg_t **_req, int *buflen, char *_buf)
{
        msg_t *req;

        YASSERT(buf->len <= MEM_CACHE_SIZE4K);

        req = (void *)_buf;
        *buflen = buf->len - sizeof(*req);
        mbuffer_get(buf, req, buf->len);

        *_req = req;
}

static int __replica_srv_getclock(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const nid_t *master;
        clockstat_t clockstat;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &master, NULL, NULL);

        ret = replica_srv_getclock(master, &req->chkid,
                                   &clockstat);
        if (unlikely(ret)) {
                DWARN("getclock "CHKID_FORMAT" ret %u peer %s\n",
                      CHKID_ARG(&req->chkid), ret,
                      network_rname(master));
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &clockstat, sizeof(clockstat));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_sha1(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char buf[MAX_MSG_SIZE], result[MAX_MSG_SIZE];
        uint64_t *version;

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &version, NULL, NULL);

        ret = replica_srv_sha1(&req->chkid, *version, result);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, result, strlen(result) + 1);

        return 0;
err_ret:
        return ret;
}

static int __replica_srv_getparent(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        chkid_t parent;

        req = (void *)buf;

        //buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        ret = replica_srv_getparent(&req->chkid, &parent, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, &parent, sizeof(parent));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_setparent(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen = 0;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const chkid_t *parent;

        req = (void *)buf;

        buflen = _buf->len - sizeof(*req);
        mbuffer_get(_buf, req, _buf->len);

        _opaque_decode(req->buf, buflen, &parent, NULL, NULL);

        ret = replica_srv_setparent(&req->chkid, parent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_read(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const io_t *io;
        buffer_t reply;
        const nid_t *reader;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &reader, NULL, &io, NULL, NULL);

        mbuffer_init(&reply, 0);

        ret = replica_srv_read(reader, io, &reply);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (sockid->type == SOCKID_CORENET) {
                corerpc_reply1(sockid, msgid, &reply);
        } else {
                rpc_reply1(sockid, msgid, &reply);
        }

        mbuffer_free(&reply);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_read(const nid_t *nid, const io_t *io, buffer_t *_buf)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        ANALYSIS_BEGIN(0);

        YASSERT(io->offset <= LICH_CHK_LEN_MAX);

        req = (void *)buf;
        req->op = REPLICA_READ;
        req->chkid = io->id;
        _opaque_encode(&req->buf, &count, net_getnid(), sizeof(nid_t), io,
                       sizeof(*io), NULL);

        if (io->id.type == __RAW_CHUNK__) {
                ret = corerpc_postwait("replica_rpc_read", nid,
                                       req, sizeof(*req) + count, NULL,
                                       _buf, MSG_REPLICA, io->size, _get_timeout());
                if (unlikely(ret)) {
                        YASSERT(ret != EINVAL);
                        //UNIMPLEMENTED(__WARN__);
                        //corenet_close();
                        GOTO(err_ret, ret);
                }
        } else {                
                ret = rpc_request_wait2("replica_rpc_readmd", nid,
                                        req, sizeof(*req) + count, _buf,
                                        MSG_REPLICA, 0, _get_timeout());
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "replica_rpc_read");

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_write(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t *magic, buflen;
        const nid_t *writer;
        const io_t *io;

        req = (void *)buf;
        mbuffer_get(_buf, req, sizeof(*req));
        buflen = req->buflen;
        ret = mbuffer_popmsg(_buf, req, buflen + sizeof(*req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(req->buf, buflen, &writer, NULL, &io, NULL, &magic, NULL, NULL);

        DBUG("write chunk "CHKID_FORMAT", off %llu, len %u:%u clock %llu\n",
             CHKID_ARG(&req->chkid), (LLU)io->offset, io->size, _buf->len, (LLU)io->vclock.clock);

        YASSERT(_buf->len == io->size);

        ret = replica_srv_write(writer, io, _buf, *magic);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (sockid->type == SOCKID_CORENET) {
                corerpc_reply(sockid, msgid, NULL, 0);
        } else {
                rpc_reply(sockid, msgid, NULL, 0);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_write(const nid_t *nid, const io_t *io, const buffer_t *_buf, uint32_t magic)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;
        time_t ltime;

        ANALYSIS_BEGIN(0);

        DBUG("write chunk "CHKID_FORMAT", off %llu, len %u clock %llu at %s\n",
             CHKID_ARG(&io->id), (LLU)io->offset, io->size, (LLU)io->vclock.clock,
             network_rname(nid));

        YASSERT(io->size == _buf->len);

        req = (void *)buf;
        req->op = REPLICA_WRITE;
        req->chkid = io->id;

        _opaque_encode(req->buf, &count, net_getnid(), sizeof(*net_getnid()),
                       io, sizeof(*io), &magic, sizeof(magic), NULL);

        req->buflen = count;

        if (likely(io->id.type == __RAW_CHUNK__)) {
                ret = corerpc_postwait("replica_rpc_write", nid,
                                       req, sizeof(*req) + count, _buf,
                                       NULL, MSG_REPLICA, io->size, _get_timeout());
                if (unlikely(ret)) {
                        //DERROR("nid %d ret %d\n", nid->id, ret);
                        YASSERT(ret != EINVAL);
                        GOTO(err_ret, ret);
                }

                ANALYSIS_QUEUE(0, IO_WARN, "replica_rpc_write");
        } else {                
                /* metadata write with rpc because 1) call stack too long. 2) core_request have not timeout. */
                ret = network_connect(nid, &ltime, 1, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                /**
                 * TODO timeout设置过长，长时间持有lock，导致别的task的lock timeout
                 * 通过lock.writer可定位到相关线程
                 *
                 * @see lock.c
                 */
                int timeout = _get_timeout();

                ret = rpc_request_wait1("replica_rpc_writemd", nid,
                                        req, sizeof(*req) + count, _buf,
                                        MSG_REPLICA, 0, timeout);
                if (unlikely(ret)) {
                        YASSERT(ret != EINVAL);

                        DWARN("write "CHKID_FORMAT" off %ju len %u clock %ju at %s timeout %d ret %d\n",
                              CHKID_ARG(&io->id),
                              io->offset, io->size, io->vclock.clock,
                              network_rname(nid),
                              timeout,
                              ret);

                        if (ret == EPERM) {
                                // TODO network_close?
                        } else {
                                network_close(nid, "replica rpc1 write fail", &ltime);
                        }

                        GOTO(err_ret, ret);
                }

                ANALYSIS_QUEUE(0, IO_WARN, "replica_rpc_writemd");
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_connect(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, *force;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const nid_t *master;
        const chkid_t *parent;
        clockstat_t clockstat;
        lease_token_t *token;
        uint32_t *magic;

        ANALYSIS_BEGIN(0);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &master, NULL,
                       &parent, NULL,
                       &token, NULL,
                       &magic, NULL,
                       &force, NULL, NULL);

        ret = replica_srv_connect(master, &req->chkid, parent,
                                  token, *magic, &clockstat, *force);
        if (unlikely(ret)) {
                DBUG("connect "CHKID_FORMAT" peer %s ret (%u) %s\n",
                     CHKID_ARG(&req->chkid),
                     network_rname(master),
                     ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, &clockstat, sizeof(clockstat));

        mem_cache_free(MEM_CACHE_4K, buf);

        ANALYSIS_END(0, 1000 * 100, NULL);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);

        ANALYSIS_END(0, 1000 * 100, NULL);
        return ret;
}

int replica_rpc_connect(const nid_t *nid, const chkid_t *chkid, const chkid_t *parent,
                        const lease_token_t *token, uint32_t magic, clockstat_t *clockstat, int force)
{
        int ret;
        char *buf;
        uint32_t count;
        msg_t *req;

        buf = mem_cache_calloc(MEM_CACHE_4K, 1);

        req = (void *)buf;
        req->op = REPLICA_CONNECT;
        req->chkid = *chkid;
        _opaque_encode(req->buf, &count, net_getnid(), sizeof(*net_getnid()),
                       parent, sizeof(*parent),
                       token, sizeof(*token),
                       &magic, sizeof(magic),
                       &force, sizeof(force), NULL);

        ret = rpc_request_wait("replica_rpc_connect", nid,
                               req, sizeof(*req) + count,
                               clockstat, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret)) {
                if (ret == EPERM) {
                        goto err_ret;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_getclock(const nid_t *nid, const chkid_t *chkid, clockstat_t *clockstat)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        req = (void *)buf;
        req->op = REPLICA_GETCLOCK;
        req->chkid = *chkid;
        _opaque_encode(req->buf, &count, net_getnid(), sizeof(*net_getnid()), NULL);

        ret = rpc_request_wait("replica_rpc_getclock", nid,
                               req, sizeof(*req) + count,
                               clockstat, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_create(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, *initzero = NULL, *tier, *force, *chknum;
        msg_t *req;
        char *buf;
        const char *pool;
        const chkid_t *chkid;
        const fileid_t *parent;
        const nid_t *master;
        uint64_t *meta_version;

        ret = ymalloc((void **)&buf, _buf->len);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);
        
        req = (void *)buf;
        mbuffer_get(_buf, req, sizeof(*req));
        buflen = req->buflen;
        ret = mbuffer_popmsg(_buf, req, buflen + sizeof(*req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _opaque_decode(req->buf, buflen,
                       &pool, NULL,
                       &master, NULL,
                       &chkid, NULL,
                       &chknum, NULL,
                       &parent, NULL,
                       &tier, NULL,
                       &initzero, NULL,
                       &meta_version, NULL,
                       &force, NULL,
                       NULL);

        ret = replica_srv_create(pool, master, chkid, *chknum, parent, *tier, *initzero,
                                 _buf->len ? _buf : NULL, *meta_version, *force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        yfree((void **)&buf);

        return 0;
err_ret:
        yfree((void **)&buf);
        return ret;
}

int replica_rpc_create(const char *pool, const nid_t *nid, const chkid_t *chkid, int chknum,
                       const fileid_t *parent, int tier, int initzero,
                       const buffer_t *initdata, uint64_t meta_version, int force)
{
        int ret, len;
        msg_t *req;
        char *buf;
        uint32_t count;

        DBUG("create chunk "CHKID_FORMAT" @ %s\n",
              CHKID_ARG(chkid), network_rname(nid));

        len = (sizeof(*chkid) * chknum + MEM_CACHE_SIZE4K);

        ret = ymalloc((void **)&buf, len);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        ANALYSIS_BEGIN(0);
        
        req = (void *)buf;
        req->op = REPLICA_CREATE;
        req->chkid = *chkid;

        _opaque_encode1(req->buf, len, &count,
                       pool, strlen(pool) + 1,
                       net_getnid(), sizeof(*net_getnid()),
                       chkid, sizeof(*chkid) * chknum,
                       &chknum, sizeof(chknum),
                       parent, sizeof(*parent),
                       &tier, sizeof(tier),
                       &initzero, sizeof(initzero),
                       &meta_version, sizeof(meta_version),
                       &force, sizeof(force),
                       NULL);

        req->buflen = count;
        ret = rpc_request_wait1("replica_rpc_create", nid,
                                req, sizeof(*req) + count, initdata,
                                MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        yfree((void **)&buf);

        ANALYSIS_QUEUE(0, IO_WARN, "replica_rpc_create");

        DBUG("create chunk "CHKID_FORMAT" @ %s finish\n",
              CHKID_ARG(chkid), network_rname(nid));
        
        return 0;
err_ret:
        yfree((void **)&buf);
        return ret;
}

static int __replica_srv_cleanup(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, NULL, NULL);

        //卷控制器发起的都是清理raw数据,节点内部自己发起的才会和元数据一起清理
        ret = replica_srv_cleanup(&req->chkid, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_cleanup(const nid_t *nid, const chkid_t *chkid)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = REPLICA_CLEANUP;
        req->chkid = *chkid;
        _opaque_encode(req->buf, &count,
                       NULL);

        ret = rpc_request_wait("replica_rpc_cleanup", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}


static int __replica_srv_unlink(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint64_t *meta_version;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &meta_version, NULL, NULL);

        ret = replica_srv_unlink(&req->chkid, *meta_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_unlink(const nid_t *nid, const chkid_t *chkid, uint64_t meta_version)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;

        req = (void *)buf;
        req->op = REPLICA_UNLINK;
        req->chkid = *chkid;
        _opaque_encode(req->buf, &count,
                       &meta_version, sizeof(meta_version),
                       NULL);

        ret = rpc_request_wait("replica_rpc_unlink", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_push(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, *tier, *flags;
        msg_t *req;
        char *pool;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const uint64_t *meta_version;
        const fileid_t *parent;
        const nid_t *nid;
        const vclock_t *vclock;

        req = (void *)buf;
        mbuffer_get(_buf, req, sizeof(*req));
        buflen = req->buflen;
        ret = mbuffer_popmsg(_buf, req, buflen + sizeof(*req));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        YASSERT(buflen);
        _opaque_decode(req->buf, buflen,
                        &pool, NULL,
                        &nid, NULL,
                        &parent, NULL,
                        &tier, NULL,
                        &vclock, NULL,
                        &meta_version, NULL,
                        &flags, NULL,
                        NULL);

        ret = replica_srv_push(pool, nid, &req->chkid, parent, *tier, vclock,
                               *meta_version, _buf, *flags);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (sockid->type == SOCKID_CORENET) {
                corerpc_reply(sockid, msgid, NULL, 0);
        } else {
                rpc_reply(sockid, msgid, NULL, 0);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_push(const char *pool, const nid_t *nid, const chkid_t *chkid, const fileid_t *parent,
                     int tier, const vclock_t *vclock, uint64_t meta_version, const buffer_t *_buf, int flags)
{
        int ret;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        time_t ltime;

        ANALYSIS_BEGIN(0);
        
        YASSERT(parent);

        req = (void *)buf;
        req->op = REPLICA_PUSH;
        req->chkid = *chkid;
        _opaque_encode(req->buf, &count, pool, strlen(pool) + 1,
                       net_getnid(), sizeof(nid_t),
                       parent, sizeof(*parent),
                       &tier, sizeof(tier),
                       vclock, sizeof(*vclock),
                       &meta_version, sizeof(meta_version),
                       &flags, sizeof(flags),
                       NULL);
        req->buflen = count;

        if (likely(chkid->type == __RAW_CHUNK__)) {
                ret = corerpc_postwait("replica_rpc_push", nid,
                                       req, sizeof(*req) + count, _buf,
                                       NULL, MSG_REPLICA, 0, _get_timeout());
                if (unlikely(ret)) {
                        YASSERT(ret != EINVAL);
                        GOTO(err_ret, ret);
                }

        } else {
                ret = network_connect(nid, &ltime, 1, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = rpc_request_wait1("replica_rpc_push", nid,
                                req, sizeof(*req) + count, _buf,
                                MSG_REPLICA, 0, _get_timeout());
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        mem_cache_free(MEM_CACHE_4K, buf);

        ANALYSIS_QUEUE(0, IO_WARN, "replica_rpc_push");

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_sha1(const nid_t *nid, const chkid_t *chkid, uint64_t version, char *sha1)
{
        int ret;
        msg_t *req;
        uint32_t count;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = REPLICA_SHA1;
        req->chkid = *chkid;
        _opaque_encode(req->buf, &count, &version, sizeof(version), NULL);
        req->buflen = count;

        ret = rpc_request_wait("replica_rpc_sha1", nid,
                               req, sizeof(*req) + count,
                               sha1, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_getparent(const diskid_t *nid, const chkid_t *chkid, chkid_t *parent)
{
        int ret;
        uint32_t count;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        req = (void *)buf;
        req->op = REPLICA_GETPARENT;
        req->chkid = *chkid;
        count = 0;

        ret = rpc_request_wait("replica_rpc_getparent", nid,
                               req, sizeof(*req) + count,
                               parent, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_setparent(const diskid_t *nid, const chkid_t *chkid, const chkid_t *parent)
{
        int ret;
        uint32_t count;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        msg_t *req;

        req = (void *)buf;
        req->op = REPLICA_SETPARENT;
        req->chkid = *chkid;
        _opaque_encode(req->buf, &count, parent, sizeof(*parent), NULL);

        ret = rpc_request_wait("replica_rpc_getparent", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_setclock(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        const nid_t *master;
        clockstat_t *clockstat;

        __getmsg(_buf, &req, &buflen, buf);

        _opaque_decode(req->buf, buflen, &master, NULL, &clockstat, NULL, NULL);

        ret = replica_srv_setclock(master, &req->chkid, clockstat);
        if (unlikely(ret)) {
                DWARN("setclock "CHKID_FORMAT" ret %u peer %s\n",
                      CHKID_ARG(&req->chkid), ret,
                      network_rname(master));
                GOTO(err_ret, ret);
        }

        rpc_reply(sockid, msgid, NULL, 0);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_setclock(const nid_t *nid, const chkid_t *chkid, const clockstat_t *clockstat)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        req = (void *)buf;
        req->op = REPLICA_SETCLOCK;
        req->chkid = *chkid;
        _opaque_encode(req->buf, &count, net_getnid(), sizeof(*net_getnid()),
                       clockstat, sizeof(*clockstat), NULL);

        ret = rpc_request_wait("replica_rpc_setclock", nid,
                               req, sizeof(*req) + count,
                               NULL, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

static int __replica_srv_diskonline(const sockid_t *sockid, const msgid_t *msgid, buffer_t *_buf)
{
        int ret, buflen, online;
        msg_t *req;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);

        __getmsg(_buf, &req, &buflen, buf);

        //_opaque_decode(req->buf, buflen, &num, NULL, NULL);

        ret = replica_srv_diskonline(&req->chkid, &online);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rpc_reply(sockid, msgid, &online, sizeof(online));

        mem_cache_free(MEM_CACHE_4K, buf);

        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_rpc_diskonline(const nid_t *nid, const chkid_t *chkid, int *online)
{
        int ret;
        char *buf = mem_cache_calloc1(MEM_CACHE_4K, PAGE_SIZE);
        uint32_t count;
        msg_t *req;

        req = (void *)buf;
        req->op = REPLICA_ONLINE;
        req->chkid = *chkid;
        count = 0;
        //_opaque_encode(req->buf, &count,
        //&num, sizeof(num), NULL);

        ret = rpc_request_wait("replica_rpc_diskonline", nid,
                               req, sizeof(*req) + count,
                               online, NULL,
                               MSG_REPLICA, 0, _get_timeout());
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mem_cache_free(MEM_CACHE_4K, buf);
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_4K, buf);
        return ret;
}

int replica_diskonline(const chkinfo_t *chkinfo, int idx)
{
        int ret, online;
        const nid_t *nid;

        nid = &chkinfo->diskid[idx].id;

        online = 0;
        if (net_islocal(nid)) {
                ret = replica_srv_diskonline(&chkinfo->id, &online);
                if (ret)
                        GOTO(err_ret, ret);
        } else {
                ret = replica_rpc_diskonline(nid, &chkinfo->id, &online);
                if (ret)
                        GOTO(err_ret, ret);
        }

        return online;
err_ret:
        online = 0;
        return online;
}

static void __request_handler(void *arg)
{
        int ret;
        msg_t req;
        sockid_t sockid;
        msgid_t msgid;
        buffer_t buf;
        __request_handler_func__ handler;
        const char *name;

        request_trans(arg, &sockid, &msgid, &buf, NULL);

        if (buf.len < sizeof(req)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        mbuffer_get(&buf, &req, sizeof(req));

        DBUG("new op %u from %s, id (%u, %x)\n", req.op,
             _inet_ntoa(sockid.addr), msgid.idx, msgid.figerprint);

#if 0
        if (!netable_connected(net_getadmin())) {
                ret = ENONET;
                GOTO(err_ret, ret);
        }
#endif

        __request_get_handler(req.op, &handler, &name);
        if (handler == NULL) {
                ret = ENOSYS;
                DWARN("error op %u\n", req.op);
                GOTO(err_ret, ret);
        }

        schedule_task_setname(name);

        ret = handler(&sockid, &msgid, &buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mbuffer_free(&buf);

        DBUG("reply op %u from %s, id (%u, %x)\n", req.op,
              _inet_ntoa(sockid.addr), msgid.idx, msgid.figerprint);

        return ;
err_ret:
        mbuffer_free(&buf);
        if (sockid.type == SOCKID_CORENET) {
                corerpc_reply_error(&sockid, &msgid, ret);
        } else {
                rpc_reply_error(&sockid, &msgid, ret);
        }
        DBUG("error op %u from %s, id (%u, %x)\n", req.op,
             _inet_ntoa(sockid.addr), msgid.idx, msgid.figerprint);
        return;
}

int replica_rpc_init()
{
        DINFO("replica rpc init\n");

        __request_set_handler(REPLICA_CREATE, __replica_srv_create, "replica_srv_create");
        __request_set_handler(REPLICA_UNLINK, __replica_srv_unlink, "replica_srv_unlink");
        __request_set_handler(REPLICA_CLEANUP, __replica_srv_cleanup, "replica_srv_cleanup");
        __request_set_handler(REPLICA_CONNECT, __replica_srv_connect, "replica_srv_connect");
        __request_set_handler(REPLICA_GETCLOCK, __replica_srv_getclock, "replica_srv_getclock");
        __request_set_handler(REPLICA_SETCLOCK, __replica_srv_setclock, "replica_srv_setclock");
        __request_set_handler(REPLICA_WRITE, __replica_srv_write, "replica_srv_write");
        __request_set_handler(REPLICA_READ, __replica_srv_read, "replica_srv_read");
        __request_set_handler(REPLICA_PUSH, __replica_srv_push, "replica_srv_push");
        __request_set_handler(REPLICA_SHA1, __replica_srv_sha1, "replica_srv_sha1");
        __request_set_handler(REPLICA_GETPARENT, __replica_srv_getparent, "replica_srv_getparent");  
        __request_set_handler(REPLICA_SETPARENT, __replica_srv_setparent, "replica_srv_setparent");
        __request_set_handler(REPLICA_ONLINE, __replica_srv_diskonline, "replica_srv_diskonline");
      
        if (ng.daemon) {
                rpc_request_register(MSG_REPLICA, __request_handler, NULL);

                corerpc_register(MSG_REPLICA, __request_handler, NULL);
        }

        return 0;
}
